之前有多语言consume kafka的需求,因此决定采用Confluent的Kafka Rest2.0.1。虽然测试的结果不如人意(java consumer大约60M/s,但用rest的话只有5M/s),考虑到消息量不大,也能凑合用
但最近在kafka集群中新创建了200个测试的空topic,rest consumer的性能急剧降低,惨不忍睹。。。
下载了源码debug了一下,发现每次consume都需要判断一下topic是否存在
1 | public List<Topic> getTopics() { |
MetadataObserver.getTopics()耗时非常长,该函数先从zookeeper中取出所有的topicName,然后再到zookeeper中把所有的topic的metaData查询出来封装成Topic对象然后返回。增加了200个topic之后,获取所有topic的metaData变得非常的耗时间,其实这是完全没有必要的,直接将Seq
因此代码改成
1 | public Collection<String> getTopicNames() { |
那么,rest consumer为什么每次消费都需要先判断Topic是否存在呢?
rest是无状态的,但kafka consumer必须要保存状态,因此必须在rest服务器端缓存consumer instances,每个consume rest请求都是通过缓存的instance向kafka server请求数据。
rest consumer为了限制每次请求返回的数据量,避免客户端内存爆掉,提供了consumer.request.timeout.ms参数,默认值1000ms,意味着无论instance是否从kafka server端拿到了数据,1秒钟后必须给rest 客户端响应,如果没拿到数据,就返回空数组。
那么问题来了,假如topic不存在,consumer instance会获取topic的metadata超时,但在超时之前,就已经超过了1秒,空数组会被返回给rest client。事实上,rest client的每次消费请求都会返回空数组,但它只知道没有拿到数据,并不知道是topic不存在,还是真的没有数据被produce进来。
为了避免这种情况发生,必须处理每个consume请求时先判断topic是否存在。
顺带说一句,java client也应该提供判断topic是否存在的api啊。。。不能让client等这么久最后返回个failed to update metadata after 60000ms…能不能友好一点。。
———————————- 我是分割线 ——————————————–
注:该BUG在3.0版本已被官方修复。